[3/n] [Serve] Defer rank assignment after replica is allocated#58477
[3/n] [Serve] Defer rank assignment after replica is allocated#58477abrarsheikh merged 17 commits intomasterfrom
Conversation
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
…58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR #58477 --------- Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
There was a problem hiding this comment.
Bug: Recovery Breaks Deferred Rank Assignment
During replica recovery after controller restart, initialize_and_get_metadata.remote() is called without passing a rank parameter (line 671), but the method signature now requires it. The _assign_rank_callback is never set during recovery since start() isn't called, and the rank assignment logic in check_ready() (lines 743-746) is skipped because _ready_obj_ref is already set by recover(). This causes recovered replicas to initialize without a rank, breaking the deferred rank assignment feature.
python/ray/serve/_private/deployment_state.py#L627-L673
ray/python/ray/serve/_private/deployment_state.py
Lines 627 to 673 in d7131f0
No true, rank is not required.
this is expect, because we want to fetch rank from already running replica instead to assigning it a rank during recovery. |
Signed-off-by: abrar <abrar@anyscale.com>
There was a problem hiding this comment.
Bug: Broken Rank Assignment in Replica Recovery
During replica recovery after controller restart, initialize_and_get_metadata.remote() is called without passing a rank parameter, but the method signature now requires it. The _assign_rank_callback is never set during recovery since start() isn't called, and the rank assignment logic in check_ready() is skipped because _ready_obj_ref is already set by recover(). This causes recovered replicas to initialize without a rank, breaking the deferred rank assignment system.
python/ray/serve/_private/deployment_state.py#L669-L672
ray/python/ray/serve/_private/deployment_state.py
Lines 669 to 672 in 29ce266
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <abrar@anyscale.com>
zcin
left a comment
There was a problem hiding this comment.
what is the impact of this change on replica startup time?
I think this adds |
|
Can we test / measure e2e |
test applicationprofile codediff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py
index 4b96833aff..829e0a84f0 100644
--- a/python/ray/serve/_private/deployment_state.py
+++ b/python/ray/serve/_private/deployment_state.py
@@ -283,6 +283,7 @@ class ActorReplicaWrapper:
# Outbound deployments polling state
self._outbound_deployments: Optional[List[DeploymentID]] = None
+ self._replica_start_ts: float = 0.0
@property
def replica_id(self) -> str:
@@ -460,7 +461,7 @@ class ActorReplicaWrapper:
self._deployment_is_cross_language = (
deployment_info.deployment_config.is_cross_language
)
-
+ self._replica_start_ts = time.time()
logger.info(
f"Starting {self.replica_id}.",
extra={"log_to_stderr": False},
@@ -790,6 +791,9 @@ class ActorReplicaWrapper:
)
return ReplicaStartupStatus.FAILED, repr(e)
+ time_taken = time.time() - self._replica_start_ts
+ logger.info(f"Replica {self._replica_id} started in {time_taken:.2f}s.")
+
return ReplicaStartupStatus.SUCCEEDED, None
@propertyFrom this PR❯ RAY_SERVE_CONTROL_LOOP_INTERVAL_S=2 serve run raw_config.yaml
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,908 controller 3251490 -- Replica Replica(id='ykzh0lp3', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,908 controller 3251490 -- Replica Replica(id='wj1jwbst', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='cob2l0ts', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='luh8ykjt', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='8vsm1wg7', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='ax8lcckm', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='rdhkqk0q', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='888udqd7', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='15e088r3', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='4r4b7on4', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='qq5kzseb', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='iffoha2e', deployment='DITest', app='default') started in 4.08s.From master |
|
Hmm that's a pretty significant increase. Is there a way to avoid it? |
|
+2 additional seconds to start the replica is because I set The other option I can think of to start the replica in the same controller iteration is to use |
|
without the constant |
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <abrar@anyscale.com>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <abrar@anyscale.com>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <abrar@anyscale.com>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Summary
Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated.
Changes
start()method signature to acceptassign_rank_callbackinstead of a pre-assignedrankparameter_allocated_obj_refis resolved, ensuring the replica is allocated before rank assignmentinitialize_and_get_metadata()method on the replica actor, allowing rank to be set during initializationReplicaBase.initialize()to accept rank as a parameter and set it along with the internal replica contextPENDING_INITIALIZATIONstatus check to handle cases where_ready_obj_refis not yet setNext PR #58479